/*
Copyright 2021 The OpenYurt Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package preflight

import (
	"bytes"
	"fmt"
	"io"
	"net"
	"os"
	"strings"
	"sync"
	"time"

	"github.com/pkg/errors"
	batchv1 "k8s.io/api/batch/v1"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/util/sets"
	"k8s.io/client-go/kubernetes"
	"k8s.io/klog/v2"
	utilsexec "k8s.io/utils/exec"

	nodeutil "github.com/openyurtio/openyurt/pkg/controller/util/node"
	"github.com/openyurtio/openyurt/pkg/node-servant/components"
	"github.com/openyurtio/openyurt/pkg/projectinfo"
	kubeutil "github.com/openyurtio/openyurt/pkg/yurtadm/util/kubernetes"
)

// Error defines struct for communicating error messages generated by preflight-convert-convert checks
type Error struct {
	Msg string
}

// Error implements the standard error interface
func (e *Error) Error() string {
	return fmt.Sprintf("[preflight] Some fatal errors occurred:\n%s%s", e.Msg, "[preflight] If you know what you are doing, you can make a check non-fatal with `--ignore-preflight-errors=...`\n")
}

// Preflight identifies this error as a preflight-convert-convert error
func (e *Error) Preflight() bool {
	return true
}

// Checker validates the state of the system to ensure kubeadm will be
// successful as often as possible.
type Checker interface {
	Check() (warnings, errorList []error)
	Name() string
}

// IsPrivilegedUserCheck verifies user is privileged (linux - root).
// The check under windows environment has not been implemented yet.
type IsPrivilegedUserCheck struct{}

func (IsPrivilegedUserCheck) Name() string {
	return "IsPrivilegedUser"
}

// Check validates if an user has elevated (root) privileges.
func (ipuc IsPrivilegedUserCheck) Check() (warnings, errorList []error) {
	if os.Getuid() != 0 {
		return nil, []error{errors.New("user is not running as root")}
	}

	return nil, nil
}

// NodeReadyCheck checks the nodes status whether is ready.
type NodeReadyCheck struct {
	NodeLst *v1.NodeList
}

func (NodeReadyCheck) Name() string {
	return "NodeReady"
}

func (nrc NodeReadyCheck) Check() (warnings, errorList []error) {
	klog.V(1).Infoln("validating node status")
	var notReadyNodeNames []string
	for _, node := range nrc.NodeLst.Items {
		if !isNodeReady(&node.Status) {
			notReadyNodeNames = append(notReadyNodeNames, node.Name)
		}
	}
	if len(notReadyNodeNames) != 0 {
		return nil, []error{errors.Errorf("the status of nodes: %s is not 'Ready'", notReadyNodeNames)}
	}
	return nil, nil
}

// NodeEdgeWorkerLabelCheck checks whether the node contains edgeWorker label which represents an OpenYurt node.
type NodeEdgeWorkerLabelCheck struct {
	NodeLst *v1.NodeList
}

func (NodeEdgeWorkerLabelCheck) Name() string {
	return "NodeEdgeWorkerLabel"
}

func (nlc NodeEdgeWorkerLabelCheck) Check() (warnings, errorList []error) {
	klog.V(1).Infoln("validating node edgeworker label")
	var hasLabelNodeNames []string
	for _, node := range nlc.NodeLst.Items {
		if _, ok := node.Labels[projectinfo.GetEdgeWorkerLabelKey()]; ok {
			hasLabelNodeNames = append(hasLabelNodeNames, node.Name)
		}
	}
	if len(hasLabelNodeNames) != 0 {
		return nil, []error{errors.Errorf("the nodes %s has already been labeled as a OpenYurt node", hasLabelNodeNames)}
	}
	return nil, nil
}

// JobExistCheck checks whether the jobs with a specific prefix exist.
type JobExistCheck struct {
	JobLst *batchv1.JobList
	Prefix string
	Label  string
}

func (jc JobExistCheck) Name() string {
	if jc.Label != "" {
		return jc.Label
	}
	return fmt.Sprintf("JobExist-%s", jc.Prefix)
}

func (jc JobExistCheck) Check() (warnings, errorList []error) {
	klog.V(1).Infoln("validating convert jobs")
	var invalidJobNames []string
	for _, job := range jc.JobLst.Items {
		if strings.HasPrefix(job.Name, jc.Prefix) {
			invalidJobNames = append(invalidJobNames, job.Name)
		}
	}
	if len(invalidJobNames) != 0 {
		return nil, []error{errors.Errorf("jobs %s has prefix %s, may conflict with the conversion job name", invalidJobNames, jc.Prefix)}
	}
	return nil, nil
}

// NodeServantJobCheck create jobs to do preflight checks.
// After the job is successfully executed, it will be deleted.
// The failed job will not be deleted, and the user needs to delete it manually.
type NodeServantJobCheck struct {
	cliSet *kubernetes.Clientset
	jobLst []*batchv1.Job

	waitServantJobTimeout time.Duration
	checkServantJobPeriod time.Duration
}

func (NodeServantJobCheck) Name() string {
	return "NodeServantJob"
}

func (nc NodeServantJobCheck) Check() (warnings []error, errorList []error) {
	var wg sync.WaitGroup

	res := make(chan error, len(nc.jobLst))
	for _, job := range nc.jobLst {
		wg.Add(1)
		entity := *job
		go func() {
			defer wg.Done()
			if err := kubeutil.RunJobAndCleanup(nc.cliSet, &entity,
				nc.waitServantJobTimeout, nc.checkServantJobPeriod, false); err != nil {
				msg := fmt.Errorf("fail to run servant job(%s): %w\n", entity.GetName(), err)
				res <- msg
			} else {
				klog.V(1).Infof("servant job(%s) has succeeded\n", entity.GetName())
			}
		}()
	}
	wg.Wait()
	close(res)
	for m := range res {
		errorList = append(errorList, m)
	}
	return nil, errorList
}

// FileExistingCheck checks that the given file already exist.
type FileExistingCheck struct {
	Path  string
	Label string
}

// Name returns label for individual FileExistingChecks. If not known, will return based on path.
func (fac FileExistingCheck) Name() string {
	if fac.Label != "" {
		return fac.Label
	}
	return fmt.Sprintf("FileExisting-%s", strings.Replace(fac.Path, "/", "-", -1))
}

func (fac FileExistingCheck) Check() (warnings, errorList []error) {
	klog.V(1).Infof("validating the existence of file %s", fac.Path)

	if _, err := os.Stat(fac.Path); err != nil {
		return nil, []error{errors.Errorf("%s doesn't exist", fac.Path)}
	}
	return nil, nil
}

// FileAtLeastOneExistingCheck checks if at least one file exists in the file list.
// After a file is found, the remaining files will not be checked.
type FileAtLeastOneExistingCheck struct {
	Paths []string
	Label string
}

func (foc FileAtLeastOneExistingCheck) Name() string {
	if foc.Label != "" {
		return foc.Label
	}
	return fmt.Sprintf("FileAtLeastOneExistingCheck-%s", foc.Paths[0])
}

func (foc FileAtLeastOneExistingCheck) Check() (warnings, errorList []error) {
	klog.V(1).Infof("validating if at least one file exists in the file list: %s", foc.Paths)
	for _, path := range foc.Paths {
		if _, err := os.Stat(path); err == nil {
			return nil, nil
		}
	}
	return nil, []error{errors.Errorf("no file in list %s exists", foc.Paths)}

}

// DirExistingCheck checks if the given directory either exist, or is not empty.
type DirExistingCheck struct {
	Path  string
	Label string
}

// Name returns label for individual DirExistingChecks. If not known, will return based on path.
func (dac DirExistingCheck) Name() string {
	if dac.Label != "" {
		return dac.Label
	}
	return fmt.Sprintf("DirExisting-%s", strings.Replace(dac.Path, "/", "-", -1))
}

// Check validates if a directory exists or does not empty.
func (dac DirExistingCheck) Check() (warnings, errorList []error) {
	klog.V(1).Infof("validating the existence of directory %s", dac.Path)

	if _, err := os.Stat(dac.Path); os.IsNotExist(err) {
		return nil, []error{errors.Errorf("%s doesn't exist", dac.Path)}
	}

	f, err := os.Open(dac.Path)
	if err != nil {
		return nil, []error{errors.Wrapf(err, "unable to check if %s is empty", dac.Path)}
	}
	defer f.Close()

	_, err = f.Readdirnames(1)
	if err == io.EOF {
		return nil, []error{errors.Errorf("%s is empty", dac.Path)}
	}
	return nil, nil
}

// PortOpenCheck ensures the given port is available for use.
type PortOpenCheck struct {
	port  int
	label string
}

func (poc PortOpenCheck) Name() string {
	if poc.label != "" {
		return poc.label
	}
	return fmt.Sprintf("Port-%d", poc.port)
}

func (poc PortOpenCheck) Check() (warnings, errorList []error) {
	klog.V(1).Infof("validating availability of port %d", poc.port)

	ln, err := net.Listen("tcp", fmt.Sprintf(":%d", poc.port))
	if err != nil {
		errorList = []error{errors.Errorf("Port %d is in use", poc.port)}
	}
	if ln != nil {
		if err = ln.Close(); err != nil {
			warnings = append(warnings, errors.Errorf("when closing port %d, encountered %v", poc.port, err))
		}
	}
	return warnings, errorList
}

// ImagePullCheck will pull container images used by node-servant
type ImagePullCheck struct {
	//runtime utilruntime.ContainerRuntime
	runtime         components.ContainerRuntimeForImage
	imageList       []string
	imagePullPolicy v1.PullPolicy
}

func (ImagePullCheck) Name() string {
	return "ImagePull"
}

func (ipc ImagePullCheck) Check() (warnings, errorList []error) {
	policy := ipc.imagePullPolicy
	klog.V(1).Infof("using image pull policy: %s", policy)
	for _, image := range ipc.imageList {
		switch policy {
		case v1.PullNever:
			klog.V(1).Infof("skipping pull of image: %s", image)
			continue
		case v1.PullIfNotPresent:
			ret, err := ipc.runtime.ImageExists(image)
			if ret && err == nil {
				klog.V(1).Infof("image exists: %s", image)
				continue
			}
			if err != nil {
				errorList = append(errorList, errors.Wrapf(err, "failed to check if image %s exists", image))
			}
			fallthrough // Proceed with pulling the image if it does not exist
		case v1.PullAlways:
			klog.V(1).Infof("pulling: %s", image)
			if err := ipc.runtime.PullImage(image); err != nil {
				errorList = append(errorList, errors.Wrapf(err, "failed to pull image %s", image))
			}
		default:
			// If the policy is unknown return early with an error
			errorList = append(errorList, errors.Errorf("unsupported pull policy %q", policy))
			return warnings, errorList
		}
	}
	return warnings, errorList
}

func RunConvertNodeChecks(o KubePathOperator, ignorePreflightErrors sets.String, deployTunnel bool) error {
	// First, check if we're root separately from the other preflight-convert-convert checks and fail fast
	if err := RunRootCheckOnly(ignorePreflightErrors); err != nil {
		return err
	}

	checks := []Checker{
		FileAtLeastOneExistingCheck{Paths: o.GetKubeadmConfPaths(), Label: "KubeadmConfig"},
		FileExistingCheck{Path: o.GetKubeAdmFlagsEnvFile(), Label: "KubeAdmFlagsEnv"},
		DirExistingCheck{Path: KubernetesDir},
		DirExistingCheck{Path: KubeletPkiDir},
		PortOpenCheck{port: YurtHubProxySecurePort},
		PortOpenCheck{port: YurtHubProxyPort},
		PortOpenCheck{port: YurtHubPort},
	}

	if deployTunnel {
		checks = append(checks, PortOpenCheck{port: YurttunnelAgentPort})
	}
	return RunChecks(checks, os.Stderr, ignorePreflightErrors)

}

// RunRootCheckOnly initializes checks slice of structs and call RunChecks
func RunRootCheckOnly(ignorePreflightErrors sets.String) error {
	checks := []Checker{
		IsPrivilegedUserCheck{},
	}

	return RunChecks(checks, os.Stderr, ignorePreflightErrors)
}

// RunPullImagesCheck will pull images convert needs if they are not found on the system
func RunPullImagesCheck(o ImageOperator, ignorePreflightErrors sets.String) error {
	containerRuntime, err := components.NewContainerRuntimeForImage(utilsexec.New(), o.GetCRISocket())
	if err != nil {
		return err
	}

	checks := []Checker{
		ImagePullCheck{runtime: containerRuntime, imageList: o.GetImageList(), imagePullPolicy: o.GetImagePullPolicy()},
	}
	return RunChecks(checks, os.Stderr, ignorePreflightErrors)
}

// RunChecks runs each check, displays it's warnings/errors, and once all
// are processed will exit if any errors occurred.
func RunChecks(checks []Checker, ww io.Writer, ignorePreflightErrors sets.String) error {
	var errsBuffer bytes.Buffer

	for _, c := range checks {
		name := c.Name()
		warnings, errs := c.Check()

		if setHasItemOrAll(ignorePreflightErrors, name) {
			// Decrease severity of errors to warnings for this check
			warnings = append(warnings, errs...)
			errs = []error{}
		}

		for _, w := range warnings {
			io.WriteString(ww, fmt.Sprintf("\t[WARNING %s]: %v\n", name, w))
		}
		for _, i := range errs {
			errsBuffer.WriteString(fmt.Sprintf("\t[ERROR %s]: %v\n", name, i.Error()))
		}
	}
	if errsBuffer.Len() > 0 {
		return &Error{Msg: errsBuffer.String()}
	}
	return nil
}

// setHasItemOrAll is helper function that return true if item is present in the set (case insensitive) or special key 'all' is present
func setHasItemOrAll(s sets.String, item string) bool {
	if s.Has("all") || s.Has(strings.ToLower(item)) {
		return true
	}
	return false
}

func isNodeReady(status *v1.NodeStatus) bool {
	_, condition := nodeutil.GetNodeCondition(status, v1.NodeReady)
	return condition != nil && condition.Status == v1.ConditionTrue
}
